Source code for hysop.numerics.remesh.kernel_generator

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import os
import hashlib
import gzip
import numpy as np
import scipy as sp
import sympy as sm
from scipy import interpolate

from hysop.tools.io_utils import IO
from hysop.tools.numerics import mpq, mpfr, mpqize
from hysop.tools.cache import load_data_from_cache, update_cache
from hysop.numerics.odesolvers.runge_kutta import ExplicitRungeKutta


[docs] class Kernel: def __init__(self, register=False, verbose=False, split_polys=False, **kargs): """ Use SymmetricKernelGenerator or KernelGenerator to generate a kernel. Do not call directly. split_polys: Split each polynomial in two, to gain precision. """ dic = kargs varnames = ["n", "r", "deg", "Ms", "Mh", "H", "remesh", "P"] for var in varnames: setattr(self, var, dic[var]) self.split_polys = split_polys self.varnames = varnames if register: self._register(dic) self._build(verbose, split_polys) self._hashable = True
[docs] @classmethod def hash_kernel_key(cls, n, r, deg, Ms, H, remesh): s = f"{n}_{r}_{deg}_{Ms}_{H}_{int(remesh)}" return hashlib.sha256(s.encode("utf-8")).hexdigest()
[docs] @classmethod def cache_file(cls): _cache_dir = IO.cache_path() + "/numerics" _cache_file = _cache_dir + "/remesh.pklz" return _cache_file
def __eq__(self, other): if not isinstance(other, ExplicitRungeKutta): return NotImplemented eq = self.split_polys == other.split_polys for var in self.varnames: eq &= getattr(self, var) == getattr(other, var) return eq def __ne__(self, other): eq = self == other if isinstance(eq, ExplicitRungeKutta): return NotImplemented return not eq def __hash__(self): h = self.hash_kernel_key(self.n, self.r, self.deg, self.Ms, self.H, self.remesh) return hash((h, self.split_polys)) def _build(self, verbose, split_polys): # polynom symbolic variables x = sm.Symbol("x") t = sm.Symbol("t") # usefull vars Ms = self.Ms deg = self.deg P = self.P self.Px = P if verbose: print(" => Substitution in Polynomials") for Pix in P: print(" ", Pix.all_coeffs()[::-1]) print() for Pix in P: print(" ", sm.horner(Pix)) print() # split polynomials X = np.arange(-Ms, +Ms + 1) if split_polys: Pt_l = [] Pt_r = [] Pt_L = [] Pt_R = [] Cl = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64) Cr = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64) CL = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64) CR = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64) for i, Pix in enumerate(P): Pit_l = sm.polys.polytools.poly( Pix.as_expr().xreplace({x: t + X[i]}), t, domain="QQ" ) Pit_r = sm.polys.polytools.poly( Pix.as_expr().xreplace({x: X[i] + 1 - t}), t, domain="QQ" ) Pit_L = sm.polys.polytools.poly( Pix.as_expr().xreplace({x: +1 - t + X[i]}), t, domain="QQ" ) Pit_R = sm.polys.polytools.poly( Pix.as_expr().xreplace({x: X[i] - t}), t, domain="QQ" ) Cl[:, i] = np.asarray(Pit_l.all_coeffs(), dtype=np.float64) Cr[:, i] = np.asarray(Pit_r.all_coeffs(), dtype=np.float64) CL[:, i] = np.asarray(Pit_L.all_coeffs(), dtype=np.float64) CR[:, i] = np.asarray(Pit_R.all_coeffs(), dtype=np.float64) Pt_l.append(Pit_l) Pt_r.append(Pit_r) Pt_L.append(Pit_L) Pt_R.append(Pit_R) self.Cl = Cl self.Cr = Cr self.CL = CL self.CR = CR self.Pt_l = Pt_l self.Pt_r = Pt_r self.Pt_L = Pt_L self.Pt_R = Pt_R if verbose: print(" => Splitting polynomials") for p in Pt_l: print(" ", p.all_coeffs()) print() for p in Pt_r: print(" ", p.all_coeffs()) print() print() for p in Pt_l: print(" ", sm.horner(p)) print() for p in Pt_r: print(" ", sm.horner(p)) else: Pt = [] Pt_L = [] C = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64) CL = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64) for i, Pix in enumerate(P): Pit = sm.polys.polytools.poly( Pix.as_expr().xreplace({x: t + X[i]}), t, domain="QQ" ) Pit_L = sm.polys.polytools.poly( Pix.as_expr().xreplace({x: 1 - t + X[i]}), t, domain="QQ" ) C[:, i] = np.asarray(Pit.all_coeffs(), dtype=np.float64) CL[:, i] = np.asarray(Pit_L.all_coeffs(), dtype=np.float64) Pt.append(Pit) Pt_L.append(Pit_L) self.Pt = Pt self.Pt_L = Pt_L self.C = C self.CL = CL if verbose: print(" => Substituing x = t+i") for Pit in Pt: print(" ", Pit.all_coeffs()[::-1]) print() for Pit in Pt: print(" ", sm.horner(Pit)) print() if verbose: print(" => Generating lambdas") imin = -Ms imax = +Ms if split_polys: gamma_l = sp.interpolate.PPoly(Cl, X, extrapolate=False) gamma_r = sp.interpolate.PPoly(Cr, X, extrapolate=False) def gamma(x): i = np.floor(x) z = x - i res = (z >= 0.5) * gamma_r(i + 1 - z) + (z < 0.5) * gamma_l(x) res[np.isnan(res)] = 0.0 return res else: gamma_ = sp.interpolate.PPoly(C, X, extrapolate=False) def gamma(x): res = gamma_(x) res[np.isnan(res)] = 0.0 return res self.I = X self.gamma = gamma if split_polys: self.gamma_l = gamma_l self.gamma_r = gamma_r self.poly_splitted = True else: self.poly_splitted = False if verbose: print(" All done.") def _register(self, dic): key = self.hash_kernel_key( self.n, self.r, self.deg, self.Ms, self.H, self.remesh ) update_cache(self.cache_file(), key, dic) def __call__(self, x): Ms = self.Ms res = self.gamma(x) return res
[docs] class SymmetricKernelGenerator: """ Generate a symmetric piecewise polynomial that preserves the first discrete moments of a given symmetric stencil H. If H is None, an interpolation stencil is generated. """ SINGULAR_SYSTEM = {} def __init__(self, verbose=False): self.verbose = verbose self.configured = False
[docs] def configure(self, n, H=None): if n == 1: # For linear kernel L1_0 Ms = 1 H = np.zeros(2, dtype=int) H[Ms] = 1 self.remesh = True self.H = H self.Mh = None self.n = n self.Ms = Ms self.configured = True return self assert n > 0 and n % 2 == 0, "n not even or n<=0." Ms = n // 2 + 1 if H is None: H = np.zeros(2 * Ms + 1, dtype=int) H[Ms] = 1 self.remesh = True else: H = mpqize(np.asarray(H)) assert H.size == 2 * Ms + 1 self.remesh = False self.H = H if self.remesh: self.Mh = None else: Mh = [] for q in range(n + 1): mq = mpq(0) for i, h in enumerate(H, start=-Ms): mq += (i**q) * h Mh.append(mq) self.Mh = Mh self.n = n self.Ms = Ms self.configured = True return self
[docs] def solve(self, r, override_cache=False, split_polys=False, no_wrap=False): assert self.configured, "SymmetricKernelGenerator has not been configured." assert r >= 0, "r<0." deg = 2 * r + 1 n = self.n Ms = self.Ms H = self.H Mh = self.Mh remesh = self.remesh verbose = self.verbose if no_wrap: cls = dict else: cls = Kernel if verbose: print( "\nSymmetricKernelGenerator was called with the following parameters:" ) print(f" n = {n} (preserved discrete moments)") print(f" r = {r} (overall kernel regularity)") print(f" Ms = {Ms} (piecewise polynomial count)") print(f" deg = {deg} (local polynomial regularity)") print() print(" H = [" + ",".join([h.__str__() for h in H]) + "]") if not self.remesh: print(" Mh = [" + ",".join([m.__str__() for m in Mh]) + "]") print() # NG 28 august 2024: H = mpqize(np.asarray(H)) # This solution is ok for Interger values, but not for Real values. # Bug or restriction concerning sm.Rational(f2q()). # micromamba gmpy2 is an old version 2.1.5 in august 2024 (latest=2.2.1) H = mpqize(np.asarray(H)) if not self.remesh: Mh = mpqize(np.asarray(Mh)) # check if ke rnel was not already cached key = Kernel.hash_kernel_key(n, r, deg, Ms, H, remesh) if override_cache: if verbose: print(" Cache overwrite requested.") else: if verbose: print(" Checking if kernel was already computed... ", end=" ") data = load_data_from_cache(Kernel.cache_file(), key) if data is not None: if verbose: print("True") print(" Loading kernel from cache.") if data == self.SINGULAR_SYSTEM: raise RuntimeError("Could not solve linear system.") kernel = cls( verbose=verbose, register=False, split_polys=split_polys, **data ) return kernel elif verbose: print("False") print(" Building linear system...") # polynom symbolic variable x = sm.Symbol("x") # build Ms*(deg+1) symbolic coefficients (polynomial unknowns) coeffs = [] for k in range(Ms): coeffs.append([sm.symbols(f"C{k}_{d}") for d in range(deg + 1)]) # build discrete moments rhs values M = [] for i in range(n + 1): Mi = [] for j in range(deg + 1): if remesh and i == j: Mij = sm.Rational(1) elif not remesh and i - j >= 0 and Mh[i - j] != 0: Mij = sm.symbols(f"M{i}_{j}") else: Mij = 0 Mi.append(Mij) M.append(Mi) # build the Ms polynomials Pp = [] Pm = [] for i, C in enumerate(coeffs): pexpr = sm.Integer(0) # NG 28 aug 2024: replace f2q by sm.Integer mexpr = sm.Integer(0) # NG 28 aug 2024: replace f2q by sm.Integer for d, c in enumerate(C): pexpr += c * (x**d) mexpr += c * ((-x) ** d) Pp.append(sm.polys.polytools.poly(pexpr, x)) Pm.append(sm.polys.polytools.poly(mexpr, x)) P = Pm[::-1] + Pp # precompute the r first polynomial derivatives dPs = [] for p in Pp: dP = [p] for i in range(r): p = p.diff() dP.append(p) dPs.append(dP) # Build overdetermined system of equations eqs = [] # Continuity equations (Gamma is Cr continuous) # Parity in x=0 -- Gamma is EVEN -> (r+1)//2 eqs for d in range(1, r + 1, 2): eq = coeffs[0][d] # =0 eqs.append(eq) # Right-most point, zero derivatives -> (r+1) eqs for d in range(r + 1): eq = dPs[-1][d].xreplace({x: sm.Integer(Ms)}) # =0 eqs.append(eq.as_expr()) # Cr-continuity on inner points -> (Ms-1)*(r+1) eqs for d in range(r + 1): for i in range(Ms - 1): eq = dPs[i][d].xreplace({x: sm.Integer(i + 1)}) - \ dPs[i + 1][d].xreplace({x: sm.Integer(i + 1)}) # = 0 eqs.append(eq.as_expr()) # Interpolation condition on the left -> Ms equations for i in range(Ms): # NG 3 sep 2024: eq = Pp[i].xreplace({x: sm.Integer(i)}) - H[Ms + i] eq = Pp[i].xreplace({x: sm.Integer(i)}) - \ sm.Rational(sm.Integer(H[Ms + i].numerator), \ sm.Integer(H[Ms + i].denominator)) eqs.append(eq.as_expr()) # Discrete moments s = sm.symbols("s") for m in range(0, n + 1): expr = sm.Integer(0) # NG 28 aug 2024: replace f2q by sm.Integer for l in range(-Ms + 1, Ms + 1): if m > 0 and l == 0: continue i = Ms - l # NG 28 august 2024 => e = P[i].xreplace({x: s - f2q(l)}).as_expr() e = P[i].xreplace({x: s - sm.Integer(l)}).as_expr() if m > 0: # NG 28 august 2024 => e *= f2q(l**m) e *= sm.Rational(l**m) expr += e Pm = sm.polys.polytools.poly(expr, s) for i, Cmi in enumerate(Pm.all_coeffs()[::-1]): # NG 28 august 2024 => eqs.append( Cmi - sm.Rational(M[m][i]) ) eqs.append( Cmi - sm.Rational(M[m][i]) ) if verbose: print(" => System built.") unknowns = [c for cl in coeffs for c in cl] + [ m for ml in M for m in ml if isinstance(m, sm.Symbol) ] if verbose: print(" Unknowns: ", unknowns) sol = sm.solve(eqs, unknowns) if len(sol) != len(unknowns): if verbose: print("sol=", sol) update_cache(Kernel.cache_file(), key, self.SINGULAR_SYSTEM) raise RuntimeError("Could not solve linear system.") elif verbose: print(" => System solved.") for k in sorted(sol.keys(), key=lambda x: x.name): print(f" {k}: {sol[k]}") for i, Pix in enumerate(P): P[i] = Pix.xreplace(sol) # 28 august 2024: change H=mpqize(np.asarray(H)) => H = np.asarray(H) # micromamba gmpy2 version is to old ! assert P[i].domain==sm.QQ,"Polynoms are not rational!!!" kernel = cls( n=n, r=r, deg=deg, Ms=Ms, H=mpqize(H), Mh=Mh, remesh=remesh, P=P, register=True, verbose=verbose, split_polys=split_polys, ) return kernel
if __name__ == "__main__": from hysop.numerics.stencil.stencil_generator import StencilGenerator from matplotlib import pyplot as plt verbose = True sg = StencilGenerator() sg.configure(dim=1, derivative=2) for i in range(1, 5): p = 2 * i H = sg.generate_exact_stencil(order=p - 1, origin=i) print(H.coeffs) assert H.is_centered() H = [0] + H.coeffs.tolist() + [0] kg = SymmetricKernelGenerator(verbose).configure(p, H=H) kernels = [] for r in [1, 2, 4, 8]: try: kernels.append(kg.solve(r, override_cache=False)) except RuntimeError: print(f"Solver failed fro p={p} and r={r}.") if len(kernels) == 0: continue continue k0 = kernels[0] fig = plt.figure() plt.xlabel(r"$x$") plt.ylabel(r"$\Lambda_{" + "{},{}".format(p, "r") + "}$") X = np.linspace(-k0.Ms - 1, +k0.Ms + 1, 1000) s = plt.subplot(1, 1, 1, label=i) for i, k in enumerate(kernels): s.plot(X, k(X), label=r"$\Lambda_{" + f"{p},{k.r}" + "}$") s.plot(k0.I, k0.H, "or") axe_scaling = 0.10 ylim = s.get_ylim() Ly = ylim[1] - ylim[0] s.set_ylim(ylim[0] - axe_scaling * Ly, ylim[1] + axe_scaling * Ly) s.legend() plt.show(block=True) # plt.savefig('out/gamma_{}_r'.format(p))